1 package org.apache.maven.surefire.junitcore.pc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.lang.annotation.Annotation;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.EnumMap;
28 import java.util.Iterator;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36
37 import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38 import org.apache.maven.surefire.report.ConsoleStream;
39 import org.apache.maven.surefire.testset.TestSetFailedException;
40 import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
41 import org.junit.internal.runners.ErrorReportingRunner;
42 import org.junit.runner.Description;
43 import org.junit.runner.Runner;
44 import org.junit.runner.manipulation.Filter;
45 import org.junit.runner.manipulation.NoTestsRemainException;
46 import org.junit.runner.notification.RunNotifier;
47 import org.junit.runners.ParentRunner;
48 import org.junit.runners.Suite;
49 import org.junit.runners.model.InitializationError;
50 import org.junit.runners.model.RunnerBuilder;
51
52 import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
53 import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategy;
54 import static org.apache.maven.surefire.junitcore.pc.SchedulingStrategies.createParallelStrategyUnbounded;
55 import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
56 import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
57 import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 public final class ParallelComputerBuilder
84 {
85 private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
86
87 private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
88
89 private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
90
91 static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
92
93 private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
94
95 private final ConsoleStream logger;
96
97 private boolean useSeparatePools;
98
99 private int totalPoolSize;
100
101 private JUnitCoreParameters parameters;
102
103 private boolean optimize;
104
105 private boolean runningInTests;
106
107
108
109
110
111
112 ParallelComputerBuilder( ConsoleStream logger )
113 {
114 this.logger = logger;
115 runningInTests = true;
116 useSeparatePools();
117 parallelGroups.put( SUITES, 0 );
118 parallelGroups.put( CLASSES, 0 );
119 parallelGroups.put( METHODS, 0 );
120 }
121
122 public ParallelComputerBuilder( ConsoleStream logger, JUnitCoreParameters parameters )
123 {
124 this( logger );
125 runningInTests = false;
126 this.parameters = parameters;
127 }
128
129 public ParallelComputer buildComputer()
130 {
131 return new PC();
132 }
133
134 ParallelComputerBuilder useSeparatePools()
135 {
136 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
137 useSeparatePools = true;
138 return this;
139 }
140
141 ParallelComputerBuilder useOnePool()
142 {
143 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
144 useSeparatePools = false;
145 return this;
146 }
147
148
149
150
151
152
153
154 ParallelComputerBuilder useOnePool( int totalPoolSize )
155 {
156 if ( totalPoolSize < 1 )
157 {
158 throw new IllegalArgumentException( "Size of common pool is less than 1." );
159 }
160 this.totalPoolSize = totalPoolSize;
161 useSeparatePools = false;
162 return this;
163 }
164
165 boolean isOptimized()
166 {
167 return optimize;
168 }
169
170 ParallelComputerBuilder optimize( boolean optimize )
171 {
172 this.optimize = optimize;
173 return this;
174 }
175
176 ParallelComputerBuilder parallelSuites()
177 {
178 return parallel( SUITES );
179 }
180
181 ParallelComputerBuilder parallelSuites( int nThreads )
182 {
183 return parallel( nThreads, SUITES );
184 }
185
186 ParallelComputerBuilder parallelClasses()
187 {
188 return parallel( CLASSES );
189 }
190
191 ParallelComputerBuilder parallelClasses( int nThreads )
192 {
193 return parallel( nThreads, CLASSES );
194 }
195
196 ParallelComputerBuilder parallelMethods()
197 {
198 return parallel( METHODS );
199 }
200
201 ParallelComputerBuilder parallelMethods( int nThreads )
202 {
203 return parallel( nThreads, METHODS );
204 }
205
206 private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
207 {
208 if ( nThreads < 0 )
209 {
210 throw new IllegalArgumentException( "negative nThreads " + nThreads );
211 }
212
213 if ( parallelType == null )
214 {
215 throw new IllegalArgumentException( "null parallelType" );
216 }
217
218 parallelGroups.put( parallelType, nThreads );
219 return this;
220 }
221
222 private ParallelComputerBuilder parallel( Type parallelType )
223 {
224 return parallel( Integer.MAX_VALUE, parallelType );
225 }
226
227 private double parallelTestsTimeoutInSeconds()
228 {
229 return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
230 }
231
232 private double parallelTestsTimeoutForcedInSeconds()
233 {
234 return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
235 }
236
237 @SuppressWarnings( "unchecked" )
238 private static Class<? extends Annotation> loadNotThreadSafeAnnotations()
239 {
240 try
241 {
242 Class c = Class.forName( "net.jcip.annotations.NotThreadSafe" );
243 return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
244 }
245 catch ( ClassNotFoundException e )
246 {
247 return null;
248 }
249 }
250
251 final class PC
252 extends ParallelComputer
253 {
254 private final SingleThreadScheduler notThreadSafeTests =
255 new SingleThreadScheduler( ParallelComputerBuilder.this.logger );
256
257 private final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
258
259 private final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
260
261 private final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
262
263 private final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
264
265 private final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
266
267 private int poolCapacity;
268
269 private boolean splitPool;
270
271 private final Map<Type, Integer> allGroups;
272
273 private long nestedClassesChildren;
274
275 private volatile Scheduler master;
276
277 private PC()
278 {
279 super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
280 allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
281 poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
282 splitPool = ParallelComputerBuilder.this.useSeparatePools;
283 }
284
285 Collection<ParentRunner> getSuites()
286 {
287 return suites;
288 }
289
290 Collection<ParentRunner> getNestedSuites()
291 {
292 return nestedSuites;
293 }
294
295 Collection<ParentRunner> getClasses()
296 {
297 return classes;
298 }
299
300 Collection<ParentRunner> getNestedClasses()
301 {
302 return nestedClasses;
303 }
304
305 Collection<Runner> getNotParallelRunners()
306 {
307 return notParallelRunners;
308 }
309
310 int getPoolCapacity()
311 {
312 return poolCapacity;
313 }
314
315 boolean isSplitPool()
316 {
317 return splitPool;
318 }
319
320 @Override
321 protected ShutdownResult describeStopped( boolean shutdownNow )
322 {
323 ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
324 final Scheduler m = master;
325 if ( m != null )
326 {
327 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
328 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
329 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
330 }
331 return shutdownResult;
332 }
333
334 @Override
335 boolean shutdownThreadPoolsAwaitingKilled()
336 {
337 boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
338 final Scheduler m = master;
339 if ( m != null )
340 {
341 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
342 }
343 return notInterrupted;
344 }
345
346 @Override
347 public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
348 throws InitializationError
349 {
350 try
351 {
352 super.getSuite( builder, cls );
353 populateChildrenFromSuites();
354
355 WrappedRunners suiteSuites = wrapRunners( suites );
356 WrappedRunners suiteClasses = wrapRunners( classes );
357
358 long suitesCount = suites.size();
359 long classesCount = classes.size() + nestedClasses.size();
360 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
361 if ( !ParallelComputerBuilder.this.runningInTests )
362 {
363 determineThreadCounts( suitesCount, classesCount, methodsCount );
364 }
365
366 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
367 }
368 catch ( TestSetFailedException e )
369 {
370 throw new InitializationError( Collections.<Throwable>singletonList( e ) );
371 }
372 }
373
374 @Override
375 protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
376 throws Throwable
377 {
378 Runner runner = super.getRunner( builder, testClass );
379 if ( canSchedule( runner ) )
380 {
381 if ( !isThreadSafe( runner ) )
382 {
383 ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
384 notParallelRunners.add( runner );
385 }
386 else if ( runner instanceof Suite )
387 {
388 suites.add( (Suite) runner );
389 }
390 else
391 {
392 classes.add( (ParentRunner) runner );
393 }
394 }
395 else
396 {
397 notParallelRunners.add( runner );
398 }
399 return runner;
400 }
401
402 private void determineThreadCounts( long suites, long classes, long methods )
403 throws TestSetFailedException
404 {
405 RunnerCounter counts = null;
406 if ( ParallelComputerBuilder.this.optimize )
407 {
408 counts = new RunnerCounter( suites, classes, methods );
409 }
410 Concurrency concurrency =
411 resolveConcurrency( ParallelComputerBuilder.this.parameters, counts );
412 allGroups.put( SUITES, concurrency.suites );
413 allGroups.put( CLASSES, concurrency.classes );
414 allGroups.put( METHODS, concurrency.methods );
415 poolCapacity = concurrency.capacity;
416 splitPool &= concurrency.capacity <= 0;
417 }
418
419 private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
420 throws InitializationError
421 {
422
423 long childrenCounter = 0;
424 ArrayList<Runner> runs = new ArrayList<Runner>();
425 for ( T runner : runners )
426 {
427 if ( runner != null )
428 {
429 int children = countChildren( runner );
430 childrenCounter += children;
431 if ( children != 0 )
432 {
433 runs.add( runner );
434 }
435 }
436 }
437 return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
438 }
439
440 private int countChildren( Runner runner )
441 {
442 Description description = runner.getDescription();
443 Collection children = description == null ? null : description.getChildren();
444 return children == null ? 0 : children.size();
445 }
446
447 private ExecutorService createPool( int poolSize )
448 {
449 return poolSize < Integer.MAX_VALUE
450 ? Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY )
451 : Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
452 }
453
454 private Scheduler createMaster( ExecutorService pool, int poolSize )
455 {
456
457 final int finalRunnersCounter = countFinalRunners();
458
459 final SchedulingStrategy strategy;
460 if ( finalRunnersCounter <= 1 || poolSize <= 1 )
461 {
462 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
463 }
464 else if ( pool != null && poolSize == Integer.MAX_VALUE )
465 {
466 strategy = new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool );
467 }
468 else
469 {
470 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, finalRunnersCounter );
471 }
472 return new Scheduler( ParallelComputerBuilder.this.logger, null, strategy );
473 }
474
475 private int countFinalRunners()
476 {
477 int counter = notParallelRunners.isEmpty() ? 0 : 1;
478
479 if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
480 {
481 ++counter;
482 }
483
484 if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
485 {
486 ++counter;
487 }
488
489 return counter;
490 }
491
492 private void populateChildrenFromSuites()
493 {
494
495 Filter filter = new SuiteFilter();
496 for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
497 {
498 ParentRunner suite = it.next();
499 try
500 {
501 suite.filter( filter );
502 }
503 catch ( NoTestsRemainException e )
504 {
505 it.remove();
506 }
507 }
508 }
509
510 private int totalPoolSize()
511 {
512 if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
513 {
514 int total = 0;
515 for ( int nThreads : allGroups.values() )
516 {
517 total += nThreads;
518 if ( total < 0 )
519 {
520 total = Integer.MAX_VALUE;
521 break;
522 }
523 }
524 return total;
525 }
526 else
527 {
528 return poolCapacity;
529 }
530 }
531
532 private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
533 throws InitializationError
534 {
535 int parallelSuites = allGroups.get( SUITES );
536 int parallelClasses = allGroups.get( CLASSES );
537 int parallelMethods = allGroups.get( METHODS );
538 int poolSize = totalPoolSize();
539 ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
540 master = createMaster( commonPool, poolSize );
541
542 if ( suiteSuites != null )
543 {
544
545 if ( commonPool != null && parallelSuites > 0 )
546 {
547 Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
548 suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
549 }
550 else
551 {
552 suiteSuites.setScheduler( createScheduler( parallelSuites ) );
553 }
554 }
555
556
557 ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
558 allSuites.addAll( nestedSuites );
559 if ( suiteClasses != null )
560 {
561 allSuites.add( suiteClasses );
562 }
563 if ( !allSuites.isEmpty() )
564 {
565 setSchedulers( allSuites, parallelClasses, commonPool );
566 }
567
568
569 ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
570 allClasses.addAll( nestedClasses );
571 if ( !allClasses.isEmpty() )
572 {
573 setSchedulers( allClasses, parallelMethods, commonPool );
574 }
575
576
577 ParentRunner all = createFinalRunner( removeNullRunners(
578 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
579 ) );
580 all.setScheduler( master );
581 return all;
582 }
583
584 private ParentRunner createFinalRunner( List<Runner> runners )
585 throws InitializationError
586 {
587 return new Suite( null, runners )
588 {
589 @Override
590 public void run( RunNotifier notifier )
591 {
592 try
593 {
594 beforeRunQuietly();
595 super.run( notifier );
596 }
597 finally
598 {
599 afterRunQuietly();
600 }
601 }
602 };
603 }
604
605 private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
606 {
607 if ( commonPool != null )
608 {
609 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
610 boolean doParallel = poolSize > 0;
611 for ( ParentRunner runner : runners )
612 {
613 runner.setScheduler(
614 createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
615 }
616 }
617 else
618 {
619 ExecutorService pool = null;
620 if ( poolSize == Integer.MAX_VALUE )
621 {
622 pool = Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
623 }
624 else if ( poolSize > 0 )
625 {
626 pool = Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY );
627 }
628 boolean doParallel = pool != null;
629 for ( ParentRunner runner : runners )
630 {
631 runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
632 BalancerFactory.createInfinitePermitsBalancer() ) );
633 }
634 }
635 }
636
637 private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
638 Balancer concurrency )
639 {
640 SchedulingStrategy strategy =
641 doParallel & pool != null
642 ? new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool )
643 : new InvokerStrategy( ParallelComputerBuilder.this.logger );
644 return new Scheduler( ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency );
645 }
646
647 private Scheduler createScheduler( int poolSize )
648 {
649 final SchedulingStrategy strategy;
650 if ( poolSize == Integer.MAX_VALUE )
651 {
652 strategy = createParallelStrategyUnbounded( ParallelComputerBuilder.this.logger );
653 }
654 else if ( poolSize == 0 )
655 {
656 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
657 }
658 else
659 {
660 strategy = createParallelStrategy( ParallelComputerBuilder.this.logger, poolSize );
661 }
662 return new Scheduler( ParallelComputerBuilder.this.logger, null, master, strategy );
663 }
664
665 private boolean canSchedule( Runner runner )
666 {
667 return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
668 }
669
670 private boolean isThreadSafe( Runner runner )
671 {
672 return runner.getDescription().getAnnotation( JCIP_NOT_THREAD_SAFE ) == null;
673 }
674
675 private class SuiteFilter
676 extends Filter
677 {
678
679
680 @Override
681 public boolean shouldRun( Description description )
682 {
683 return true;
684 }
685
686 @Override
687 public void apply( Object child )
688 throws NoTestsRemainException
689 {
690 super.apply( child );
691 if ( child instanceof ParentRunner )
692 {
693 ParentRunner runner = ( ParentRunner ) child;
694 if ( !isThreadSafe( runner ) )
695 {
696 runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
697 }
698 else if ( child instanceof Suite )
699 {
700 nestedSuites.add( (Suite) child );
701 }
702 else
703 {
704 ParentRunner parentRunner = (ParentRunner) child;
705 nestedClasses.add( parentRunner );
706 nestedClassesChildren += parentRunner.getDescription().getChildren().size();
707 }
708 }
709 }
710
711 @Override
712 public String describe()
713 {
714 return "";
715 }
716 }
717 }
718
719 private static Suite createSuite( Collection<Runner> runners )
720 throws InitializationError
721 {
722 final List<Runner> onlyRunners = removeNullRunners( runners );
723 return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
724 {
725 };
726 }
727
728 private static List<Runner> removeNullRunners( Collection<Runner> runners )
729 {
730 final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
731 onlyRunners.removeAll( NULL_SINGLETON );
732 return onlyRunners;
733 }
734 }